package drds.plus.datanode.configuration;

import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONException;
import com.alibaba.fastjson.JSONObject;
import drds.plus.common.lifecycle.AbstractLifecycle;
import drds.plus.common.lifecycle.Lifecycle;
import drds.plus.common.utils.TStringUtil;
import drds.plus.datanode.api.DatasourceManager;
import drds.plus.datanode.fetcher.DataSourceFetcher;
import drds.plus.datanode.fetcher.DataSourceFetcherImpl;
import drds.plus.datanode.select.AbstractSelector;
import drds.plus.datanode.select.OneSelector;
import drds.plus.datanode.select.RuntimeWritableSelector;
import drds.plus.datanode.select.Selector;
import drds.plus.datanode.select.equity_manager_select.EquityManager;
import drds.plus.datanode.select.equity_manager_select.EquityManagerPrioritySelector;
import drds.plus.datasource.api.DataSource;
import drds.plus.datasource.configuration.DatabaseStatus;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;

import java.sql.SQLException;
import java.util.*;

/**
 * 一个ConfigManager对应一个TGroupDataSource，
 * 主要用于将根据Group的dataID取得的对应配置字符串信（比如db0:rwp1q1i0, db1:rwp0q0i1），
 * 转化为真正的Group层的配置体系结构：一个Group层挂着两个Atom db0 与 db1 ， 则我们使用一个 Map<String,
 * DataSourceWrapper> 来表示 其中的String 为每个Atom DS 的dbKey
 * ，DataSourceWrapper 为经过封装的TAtomDataSource
 * ---这里需要解释一下，为什么不直接使用AtomDataSource？因为每个AtomDataSource还有相应的权重和优先级信息 因此，需要***方法
 * 其中，配置的每一个Atom DataSource也只是用Atom
 * 的dbKey表示，因此，我们还需要根据此dbKey取得Atom的配置信息，并且将它封装成一个AtomDataSource对象。 因此需要***方法
 * 有了这个map能根据dbKey迅速的找到对应的Datasource也是不够的，我们的Group层应该是对应用透明的，
 * 因此，当我们的读写请求进来时，Group层应该能够根据配置的权重和优先级，自动的选择一个合适的DB上进行读写，
 * 所以，我们还需要将配置信息生成一个DBSelector来自动的完成根据权重、优先级选择合适的目标库 因此，需要***方法
 */
@Slf4j
public class DataNodeConfigurationManager extends AbstractLifecycle implements Lifecycle {

    public final DatasourceManager datasourceManager;
    public Map<String, DataSourceWrapper> dataSourceIdToDataSourceWrapperMap = new HashMap<String, DataSourceWrapper>();
    private volatile DataNodeExtraConfiguration dataNodeExtraConfiguration = new DataNodeExtraConfiguration();
    /**
     * 不能在TGroupDataSource或TGroupConnection或其他地方把DBSelector做为一个字段保存下来，
     * 否则db权重配置变了之后无法使用最新的权重配置
     */
    private volatile Selector readSelectorWrapper;
    private volatile Selector writeSelectorWrapper;
    private volatile Selector runtimeWritableAtomSelectorWrapper;

    public DataNodeConfigurationManager(DatasourceManager datasourceManager) {
        this.datasourceManager = datasourceManager;

    }

    /**
     * 将给定的k 优先级 加入这个优先级对应的V comparativeList 里面。 ----因为可能有多个DS具有相同的优先级 ---add by
     * mazhidan.pt
     */
    private static <K, V> void add2LinkedListMap(Map<K, List<V>> m, K key, V value) {
        // 从Map中先取出这个优先级的List
        List<V> c = m.get(key);
        // 如果为空，则new一个
        if (c == null) {
            c = new LinkedList<V>();
            m.put(key, c);
        }
        // 不为空，在后面add()
        c.add(value);
    }


    public static DataSourceWrapper getDataSourceWrapper(String dataSourceId, String weightString, int index, DataSourceFetcher dataSourceFetcher) throws SQLException {
        // 如果多个group复用一个真实dataSource，会造成所有group引用
        // 这个dataSource的配置 会以最后一个dataSource的配置为准
        DataSource dataSource = dataSourceFetcher.getDataSource(dataSourceId);
        DataSourceWrapper dataSourceWrapper = new DataSourceWrapper(dataSourceId, weightString, dataSource);
        return dataSourceWrapper;
    }

    public static List<DataSourceWrapper> buildDataSourceWrapperList(String dataSourceIdAndWeightStringArrayString, final DataSourceFetcher dataSourceFetcher) {
        final String[] dataSourceIdAndWeightStringArray = dataSourceIdAndWeightStringArrayString.split(","); // 逗号分隔：db0:rwp1q1i0,
        // db1:rwp0q0i1
        List<DataSourceWrapper> dataSourceWrapperList = new ArrayList<DataSourceWrapper>(dataSourceIdAndWeightStringArray.length);
        for (int i = 0; i < dataSourceIdAndWeightStringArray.length; i++) {
            final int j = i;
            final String[] dataSourceIdAndWeightString = dataSourceIdAndWeightStringArray[j].split(":"); // 冒号分隔：db0:rwp1q1i0
            final String dataSourceId = dataSourceIdAndWeightString[0].trim();
            String weightString = dataSourceIdAndWeightString.length == 2 ? dataSourceIdAndWeightString[1] : null;
            try {
                dataSourceWrapperList.add(getDataSourceWrapper(dataSourceId, weightString, j, dataSourceFetcher));
            } catch (Exception e) {
                throw new RuntimeException("init ds error! datasource key is " + dataSourceId, e);
            }
        }

        return dataSourceWrapperList;
    }

    private static EquityManager createEquityDbManager(List<DataSourceWrapper> dataSourceWrapperList, boolean isRead, DataNodeExtraConfiguration dataNodeExtraConfiguration) {
        Map<String, DataSourceWrapper> dataSourceIdToDataSourceWrapperMap = new HashMap<String, DataSourceWrapper>(dataSourceWrapperList.size());
        Map<String, Integer> dataSourceIdToWeightMap = new HashMap<String, Integer>(dataSourceWrapperList.size());
        for (DataSourceWrapper dataSourceWrapper : dataSourceWrapperList) {
            String dataSourceId = dataSourceWrapper.getDataSourceId();
            dataSourceIdToDataSourceWrapperMap.put(dataSourceId, dataSourceWrapper);
            dataSourceIdToWeightMap.put(dataSourceId, isRead ? dataSourceWrapper.getWeight().readWeight : dataSourceWrapper.getWeight().writeWeight);
        }
        EquityManager equityManager = new EquityManager(dataSourceIdToDataSourceWrapperMap, dataSourceIdToWeightMap, dataNodeExtraConfiguration);
        return equityManager;
    }

    /**
     * 根据当前的读写状态，检查数据源是否可用，数据源分两种：TAtomDataSource和普通的数据源(如DBCP数据源) 新添加一个数据源 druid
     *
     * @param dataSourceWrapper 要检查的数据源
     * @param isRead            是对数据源进行读操作(isRead=true)，还是写操作(isRead=false)
     * @return 普通的数据源不管当前的读写状态是什么，总是可用的，返回true。 TAtomDataSource如果当前的状态是NA返回false,
     * 否则根据WR状态以及isRead的值决定
     */
    public static boolean isDataSourceAvailable(DataSourceWrapper dataSourceWrapper, boolean isRead) {
        DataSource dataSource = dataSourceWrapper.getDataSource();
        DatabaseStatus dataBaseStatus = dataSource.getDatabaseStatus();
        if (dataBaseStatus.isNaStatus()) {
            return false;
        }

        if (dataBaseStatus.isRstatus() && isRead) {
            return true;
        }
        return dataBaseStatus.isWstatus() && !isRead;

    }

    /**
     * 从Diamond配置中心提取信息，构造TAtomDataSource、构造有优先级信息的读写DBSelector
     */
    public void doInit() {

        String dsWeightCommaStr = null;// globalHandler.getData();
        String extraConfig = null;// extraHandler.getNullableData(datasourceManager.getConfigReceiveTimeout(),ConfigDataHandler.FIRST_CACHE_THEN_SERVER_STRATEGY);
        if (extraConfig != null) {
            parseExtraConfig(extraConfig);
        }
        parse(dsWeightCommaStr);

    }


    public DataSource initDataSource(String applicationId, String datasourceId) {
        DataSource dataSource = new DataSource();
        try {
            dataSource.init(applicationId, datasourceId);
        } catch (Exception e) {
            e.printStackTrace();
        }

        return dataSource;
    }

    // configInfo样例: db1:rw, db2:readWeight, db3:readWeight
    private synchronized void parse(String dataSourceIdAndWeightStringArrayString) {
        List<DataSourceWrapper> dataSourceWrapperList = buildDataSourceWrapperList(dataSourceIdAndWeightStringArrayString);
        resetByDataSourceWrapper(dataSourceWrapperList);
    }

    /**
     * extraConfig is a json format string,include table dataSourceIndex relation or
     * sql_process dataSourceIndex relation or default go nonspi db configuration. example:
     * {sqlDsIndex: { 0:[sql1,sql2,sql3], 1:[sql0], 2:[sql4] }, tabDsIndex: {
     * 0:[table1,table2] 1:[table3,table4] }, defaultMain:true}
     **/
    private synchronized void parseExtraConfig(String extraConfig) {
        if (extraConfig == null) {
            this.dataNodeExtraConfiguration.getSqlForbidSet().clear();
            this.dataNodeExtraConfiguration.getSqlDsIndexMap().clear();
            this.dataNodeExtraConfiguration.getTableDsIndexMap().clear();
            this.dataNodeExtraConfiguration.setDefaultMain(false);
        }
        try {
            JSONObject obj = JSONObject.parseObject(extraConfig);
            if (obj.containsKey("sqlForbid")) {
                Set<String> tempSqlForbidSet = new HashSet<String>();
                JSONArray array = obj.getJSONArray("sqlForbid");
                for (int i = 0; i < array.size(); i++) {
                    String sql = array.getString(i);
                    String nomalSql = TStringUtil.fillTabWithSpace(sql.trim().toLowerCase());
                    if (nomalSql != null) {
                        tempSqlForbidSet.add(nomalSql);
                    }
                }
                this.dataNodeExtraConfiguration.setSqlForbidSet(tempSqlForbidSet);
            } else {
                this.dataNodeExtraConfiguration.getSqlForbidSet().clear();
            }

            if (obj.containsKey("sqlDsIndex")) {
                Map<String, String> tempSqlDsIndexMap = new HashMap<String, String>();
                JSONObject sqlDsIndex = obj.getJSONObject("sqlDsIndex");
                Iterator it = sqlDsIndex.keySet().iterator();
                while (it.hasNext()) {
                    String key = String.valueOf(it.next()).trim();
                    String index = key;// Integer.valueOf(key);
                    JSONArray array = sqlDsIndex.getJSONArray(key);
                    for (int i = 0; i < array.size(); i++) {
                        String sql = array.getString(i);
                        String nomalSql = TStringUtil.fillTabWithSpace(sql.trim().toLowerCase());
                        if (tempSqlDsIndexMap.get(nomalSql) == null) {
                            tempSqlDsIndexMap.put(nomalSql, index);
                        } else {
                            // have a nice log
                            throw new RuntimeException("sql_process can not be route to different dataSourceIndex:" + sql);
                        }
                    }
                }
                this.dataNodeExtraConfiguration.setSqlDsIndexMap(tempSqlDsIndexMap);
            } else {
                this.dataNodeExtraConfiguration.getSqlDsIndexMap().clear();
            }

            if (obj.containsKey("tabDsIndex")) {
                Map<String, String> tempTabDsIndexMap = new HashMap<String, String>();
                JSONObject sqlDsIndex = obj.getJSONObject("tabDsIndex");
                Iterator it = sqlDsIndex.keySet().iterator();
                while (it.hasNext()) {
                    String key = String.valueOf(it.next()).trim();
                    String index = key;// Integer.valueOf(key);
                    JSONArray array = sqlDsIndex.getJSONArray(key);
                    for (int i = 0; i < array.size(); i++) {
                        String table = array.getString(i);
                        String nomalTable = table.trim().toLowerCase();
                        if (tempTabDsIndexMap.get(nomalTable) == null) {
                            tempTabDsIndexMap.put(nomalTable, index);
                        } else {
                            // have a nice log
                            throw new RuntimeException("table can not be route to different dataSourceIndex:" + table);
                        }
                    }
                }
                this.dataNodeExtraConfiguration.setTableDsIndexMap(tempTabDsIndexMap);
            } else {
                this.dataNodeExtraConfiguration.getTableDsIndexMap().clear();
            }

            if (obj.containsKey("defaultMain")) {
                this.dataNodeExtraConfiguration.setDefaultMain(obj.getBoolean("defaultMain"));
            } else {
                this.dataNodeExtraConfiguration.setDefaultMain(false);
            }

        } catch (JSONException e) {
            throw new RuntimeException("datanode extraConfig is not json valid string:" + extraConfig, e);
        }
    }

    /**
     * 警告: 逗号的位置很重要，要是有连续的两个逗号也不要人为的省略掉， 数据库的个数 =
     * 逗号的个数+1，用0、1、2...编号，比如"db1,,db3"，实际上有3个数据库，
     * 业务层通过传一个ThreadLocal进来，ThreadLocal中就是这种索引编号。
     */
    private List<DataSourceWrapper> buildDataSourceWrapperList(String dataSourceIdAndWeightStringArrayString) {
        log.info("[buildDataSourceWrapperList]dsWeightCommaStr=" + dataSourceIdAndWeightStringArrayString);
        this.datasourceManager.setDataSourceIdAndWeightStringArrayString(dataSourceIdAndWeightStringArrayString);
        if ((dataSourceIdAndWeightStringArrayString == null) || (dataSourceIdAndWeightStringArrayString = dataSourceIdAndWeightStringArrayString.trim()).length() == 0) {
            throw new RuntimeException(DatasourceManager.class.getSimpleName());
        }
        return buildDataSourceWrapperList(dataSourceIdAndWeightStringArrayString, new DataSourceFetcherImpl(this));
    }


    /**
     * 将封装好的AtomDataSource的列表，进一步封装为可以根据权重优先级随机选择模板库的DBSelector
     */
    private void resetByDataSourceWrapper(List<DataSourceWrapper> dataSourceWrapperList) {
        // 删掉已经不存在的DataSourceWrapper
        Map<String, DataSourceWrapper> newDataSourceWrapperMap = new HashMap<String, DataSourceWrapper>(dataSourceWrapperList.size());
        for (DataSourceWrapper dataSourceWrapper : dataSourceWrapperList) {
            newDataSourceWrapperMap.put(dataSourceWrapper.getDataSourceId(), dataSourceWrapper);
        }
        Map<String, DataSourceWrapper> old = this.dataSourceIdToDataSourceWrapperMap;
        this.dataSourceIdToDataSourceWrapperMap = newDataSourceWrapperMap;
        // TODO 需要考虑关闭老的DataSource对象
        old.clear();
        old = null;
        Selector r_Selector = null;
        Selector w_Selector = null;
        // 如果只有一个db，则用OneDBSelector
        if (dataSourceWrapperList.size() == 1) {
            DataSourceWrapper dsw2 = dataSourceWrapperList.get(0);
            r_Selector = new OneSelector(dsw2);
            w_Selector = r_Selector;
        } else {
            // 读写优先级Map
            Map<Integer/* 优先级 */, List<DataSourceWrapper>/* 优先级为key的DS 列表 */> rPriority2DswList = new HashMap<Integer, List<DataSourceWrapper>>();
            Map<Integer, List<DataSourceWrapper>> wPriority2DswList = new HashMap<Integer, List<DataSourceWrapper>>();
            for (DataSourceWrapper dsw1 : dataSourceWrapperList) {
                add2LinkedListMap(rPriority2DswList, dsw1.getWeight().readPriority, dsw1);
                add2LinkedListMap(wPriority2DswList, dsw1.getWeight().writePriority, dsw1);
            }
            r_Selector = createDBSelector(rPriority2DswList, true);
            w_Selector = createDBSelector(wPriority2DswList, false);
        }

        r_Selector.setReadable(true);
        w_Selector.setReadable(false);

        this.readSelectorWrapper = r_Selector;
        this.writeSelectorWrapper = w_Selector;

        if (datasourceManager.isAutoSelectWriteDataSource()) {
            runtimeWritableAtomSelectorWrapper = new RuntimeWritableSelector(dataSourceIdToDataSourceWrapperMap, dataNodeExtraConfiguration);
        }


    }

    /**
     * 根据给定的具有读写优先级及每个优先级对应的DataSource链表的Map，构造DBSelector---add by mazhidan.pt
     *
     * @param priority2DswList
     * @param isRead
     * @return
     */
    private Selector createDBSelector(Map<Integer/* 优先级 */, List<DataSourceWrapper>> priority2DswList, boolean isRead) {
        if (priority2DswList.size() == 1) { // 只有一个优先级直接使用EquityDbManager
            return createDBSelector2(priority2DswList.entrySet().iterator().next().getValue(), isRead);
        } else {
            List<Integer> priorityKeys = new LinkedList<Integer>();
            priorityKeys.addAll(priority2DswList.keySet());
            Collections.sort(priorityKeys); // 优先级从小到大排序
            EquityManager[] priorityGroups = new EquityManager[priorityKeys.size()];
            for (int i = 0; i < priorityGroups.length; i++) { // 最大的优先级放到最前面
                int priority = priorityKeys.get(priorityGroups.length - 1 - i);// 倒序
                List<DataSourceWrapper> dswList = priority2DswList.get(priority);
                // PriorityDbGroupSelector依赖EquityDbManager抛出的NoMoreDataSourceException来实现，
                // 所以这里即使只有一个ds也只能仍然用EquityDbManager
                priorityGroups[i] = createEquityDbManager(dswList, isRead, dataNodeExtraConfiguration);

            }
            return new EquityManagerPrioritySelector(priorityGroups);
        }
    }

    private AbstractSelector createDBSelector2(List<DataSourceWrapper> dswList, boolean isRead) {
        AbstractSelector dbSelector;
        if (dswList.size() == 1) {
            DataSourceWrapper dsw = dswList.get(0);
            dbSelector = new OneSelector(dsw);

        } else {
            dbSelector = createEquityDbManager(dswList, isRead, dataNodeExtraConfiguration);
        }
        return dbSelector;
    }

    /**
     * 根据是读还是写来选择对应的DBSelector---add by mazhidan.pt
     */
    public Selector getSelector(boolean isRead, boolean autoSelectWriteDataSource) {
        Selector selector = isRead ? readSelectorWrapper : writeSelectorWrapper;
        if (!isRead && autoSelectWriteDataSource) {
            // 因为所有dbSelector内部的TAtomDataSource都是指向同一个实例，如果某一个TAtomDataSource的状态改了，
            // 那么所有包含这个TAtomDataSource的dbSelector都会知道状态改变了，
            // 所以只要有一个TAtomDataSource的状态变成W，
            // 那么不管这个dbSelector是专门用于读的，还是专门用于写的，也不管是不是runtimeWritableAtomDBSelector，
            // 只要调用了hasWritableDataSource()都会返回true

            // if(!query.hasWritableDataSource())
            selector = runtimeWritableAtomSelectorWrapper;
        }
        return selector;
    }

    // 仅用于测试
    public void resetDbGroup(String configInfo) {
        try {
            parse(configInfo);
        } catch (Throwable t) {
            log.error("resetDbGroup failed:" + configInfo, t);
        }

    }

    protected void doDestroy() {
        // 关闭下层DataSource
        if (dataSourceIdToDataSourceWrapperMap != null) {
            for (DataSourceWrapper dataSourceWrapper : dataSourceIdToDataSourceWrapperMap.values()) {
                try {
                    DataSource dataSource = dataSourceWrapper.getDataSource();
                    dataSource.destroyDataSource();
                } catch (Exception e) {
                    log.error("we got exception when close datasource : " + dataSourceWrapper.getDataSourceId(), e);
                }
            }
        }

    }

    public void destroyDataSource() {
        destroy();
    }

    public Logger log() {
        return log;
    }

    private class ConfigReceiver // implements ConfigDataListener
    {

        private DataNodeConfigurationManager configManager;

        public void setConfigManager(DataNodeConfigurationManager configManager) {
            this.configManager = configManager;
        }

        public void onDataRecieved(String dataId, String data) {
            try {
                String oldData = this.configManager.datasourceManager.getDataSourceIdAndWeightStringArrayString();

                parse(data);
            } catch (Throwable t) {
                log.error("动态解析配置信息时出现错误:" + data, t);
            }
        }
    }

    private class ExtraGroupConfigReceiver
            // implements ConfigDataListener
    {

        public void onDataRecieved(String dataId, String data) {
            // LoggerInit.TDDL_DYNAMIC_CONFIG.info("receive datanode
            // extra data:" + data);
            parseExtraConfig(data);
        }
    }
}
